/*
 * Copyright (c) 2021-2021, talkweb 拓维信息 www.talkweb.com.cn.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.talkweb.iot.mqtt.broker.service.impl;

import com.talkweb.iot.mqtt.broker.service.IMqttClusterService;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.mica.core.utils.StringUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;

/**
 * mqtt 基础服务
 *
 * @author L.cm
 */
@Slf4j
@Service
public class MqttClusterServiceImpl implements IMqttClusterService {
	@Autowired
	private MqttServer mqttServer;
	@Autowired
	private KafkaTemplate<String, Message> kafkaTemplate;

	@Override
	public void exchange(Message message) {
		MessageType messageType = message.getMessageType();
		String topic = message.getTopic();
		if (MessageType.CONNECT == messageType) {
			// 1. 如果一个 clientId 在集群多个服务上连接时断开其他的
			String node = message.getNode();
			String nodeName = mqttServer.getServerCreator().getNodeName();
			if (nodeName.equals(node)) {
				return;
			}
			String clientId = message.getClientId();
			ChannelContext context = Tio.getByBsId(mqttServer.getServerConfig(), clientId);
			if (context != null) {
				Tio.remove(context, String.format("clientId:[%s] now bind on mqtt node:[%s]", clientId, node));
			}
		} else if (MessageType.SUBSCRIBE == messageType) {
			// http api 订阅广播
			String formClientId = message.getFromClientId();
			ChannelContext context = mqttServer.getChannelContext(formClientId);
			if (context != null) {
				IMqttSessionManager sessionManager = mqttServer.getServerCreator().getSessionManager();
				sessionManager.addSubscribe(topic, formClientId, message.getQos());
			}
		} else if (MessageType.UNSUBSCRIBE == messageType) {
			// http api 取消订阅广播
			String formClientId = message.getFromClientId();
			ChannelContext context = mqttServer.getChannelContext(formClientId);
			if (context != null) {
				IMqttSessionManager sessionManager = mqttServer.getServerCreator().getSessionManager();
				sessionManager.removeSubscribe(topic, formClientId);
			}
		} else if (MessageType.UP_STREAM == messageType) {
			// mqtt 上行消息，需要发送到对应的监听的客户端
			sendToClient(topic, message);
		} else if (MessageType.DOWN_STREAM == messageType) {
			// http rest api 下行消息也会转发到此
			sendToClient(topic, message);
		}
	}

	@Override
	public void sendToKafka(String topic, Message message) {
		kafkaTemplate.send(topic, message);
	}

	@Override
	public void sendToClient(Message message) {
		// 下行消息，发送到设备
		String topic = message.getTopic();
		if (StringUtil.isBlank(topic)) {
			log.error("Mqtt down stream topic is blank.");
			return;
		}
		sendToClient(topic, message);
	}

	/**
	 * 发送消息到客户端
	 *
	 * @param topic   topic
	 * @param message Message
	 */
	private void sendToClient(String topic, Message message) {
		// 客户端id
		String clientId = message.getClientId();
		MqttQoS mqttQoS = MqttQoS.valueOf(message.getQos());
		if (StringUtil.isBlank(clientId)) {
			mqttServer.publishAll(topic, message.getPayload(), mqttQoS, message.isRetain());
		} else {
			mqttServer.publish(clientId, topic, message.getPayload(), mqttQoS, message.isRetain());
		}
	}

}
